#include <list.h>
#include "config.h"

#define DBG_SUBSYS S_LIBSTORAGE

#include "sysy_lib.h"
#include "coroutine.h"
#include "list.h"

#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"

// extern int _align_page_write(void *arg);

typedef struct {
        hashtable_t ht;
        uint64_t lba;
        count_list_t fifo;
        // int running;
        // lsv_rwlock_t lock;
        int __enter_count;
} lba_fifo_t;

static inline uint32_t __key_func(const void *k) {
        const uint64_t *lba = k;
        return (*lba) % 1048576;
}

static inline int __cmp_func(const void *s1, const void *s2) {
        const lba_fifo_t *e1 = s1;
        const uint64_t *lba = s2;
        if( e1->lba < *lba)
                return -1;
        else if (e1->lba > *lba)
                return 1;
        return 0;
}

int lba_fifo_init(hashtable_t *_ht) {
        hashtable_t ht;

        *_ht = NULL;

        ht = hash_create_table(__cmp_func, __key_func, "LBA");

        *_ht = ht;
        return 0;
}

int lba_fifo_destroy(hashtable_t ht) {
        // TODO
        (void) ht;
        return 0;
}

static inline void __free_func(void *arg) {
        lba_fifo_t *e = arg;
        static uint64_t free_count = 0;

        free_count++;
        DINFO_NOP("lba %llu %p free %llu\n", (LLU)e->lba, e, (LLU)free_count);

        YASSERT(e->fifo.count == 0);

        yfree((void **)&e);
}

#if 0
static int _mq_swf(void *arg) {
        // TODO if last one, delete from tree
        int ret;
        lba_fifo_t *fifo = arg;
        lba_fifo_t *e;

        YASSERT(fifo->fifo.count == 0);

        ret = hash_table_remove(fifo->ht, &fifo->lba, (void **)&e);
        YASSERT(ret == 0);
        YASSERT(fifo == e);
        __free_func(e);

        return 0;
}
#endif

int lba_fifo_insert_page(hashtable_t ht, wbuf_io_frag_t *io_frag) {
        int ret;
        lba_fifo_t *e;

        YASSERT(io_frag->lba % LSV_PAGE_SIZE == 0);

        e = hash_table_find(ht, (void *)&io_frag->lba);
        if (e == NULL) {
                ret = ymalloc((void **)&e, sizeof(lba_fifo_t));
                if (unlikely(ret))
                        YASSERT(0);

                e->ht = ht;
                e->lba = io_frag->lba;
                e->__enter_count = 0;
                // e->running = 0;
                // lsv_rwlock_init(&e->lock);

                count_list_init(&e->fifo);

                ret = hash_table_insert(ht, (void *)e, &e->lba, 0);
        }
        count_list_add_tail(&io_frag->hook, &e->fifo);

        DINFO_NOP("lba %llu io_frag %p total %u\n", (LLU)io_frag->lba, io_frag, e->fifo.count);

        return 0;
}

typedef struct {
        lsv_volume_proto_t *lsv_info;
        co_func_t func;
        wbuf_io_frag_t *io_frag;
        uint64_t lba;
} fifo_fill_ctx_t;


static int __do_fifo_fill(void *arg) {
        int ret;
        fifo_fill_ctx_t *ctx = arg;
        lsv_volume_proto_t *lsv_info = ctx->lsv_info;
        co_func_t func = ctx->func;
        wbuf_io_frag_t *io_frag = ctx->io_frag;
        uint64_t lba = ctx->lba;

        YASSERT(io_frag->lba == lba);

        DINFO_NOP("pop io %p lsn %llu buf %p %u lba %llu %llu %u chunk %p %u page %u snap %u\n",
               io_frag->io,
               (LLU)io_frag->io->lsn,
               io_frag->buf,
               io_frag->buf->len,
               (LLU)OFF2LBA(io_frag->off),
               (LLU)io_frag->off,
               io_frag->size,
               io_frag->chunk,
               io_frag->chunk->id,
               io_frag->page_idx,
               io_frag->snap_id);

        ltable_wrlock(&lsv_info->lock_table, lba);
        ret = func(io_frag);
        ltable_unlock(&lsv_info->lock_table, lba);
        if (unlikely(ret)) {
                // TODO error handling
                YASSERT(0);
        }

        io_frag->status = WBUF_IO_FRAG_FILL;

        return 0;
}

/**
 * @todo 重入时有问题，为保证LBA上FIFO执行的顺序，需要进一步的同步机制.
 *
 * double check
 * - e被改变
 *
 * 循环中的元素， 在并发情况下，可能属于多个io
 * 故元素必须包含所有必须的信息，只需传入LBA
 *
 * 先移入本地队列，故队列为空，不意味着执行完成
 *
 * @param ht
 * @param lsv_info
 * @param head
 * @param func
 * @param lba
 * @return
 */
int lba_fifo_fill_page(hashtable_t ht, co_func_t func, wbuf_io_frag_t *io_frag) {
        int ret;
        struct list_head *pos;
        wbuf_io_frag_t *io_frag2;
        uint64_t lba = io_frag->lba;

        YASSERT(io_frag->status == WBUF_IO_FRAG_PREPARE);

        lba_fifo_t *e, *e2;

        e = hash_table_find(ht, (void *)&lba);
        YASSERT(e != NULL);
        YASSERT(!list_empty_careful(&e->fifo.list));

        // 如果非首元素，则wait
        pos = e->fifo.list.next;
        io_frag2 = list_entry(pos, wbuf_io_frag_t, hook);
        if (io_frag != io_frag2) {
                DINFO("lba %llu io_frag %p wait\n", (LLU)lba, io_frag);
                io_frag->status = WBUF_IO_FRAG_WAIT;
                co_cond_wait(&io_frag->cond);

                // double check, NOT loop check
                // must be first
                pos = e->fifo.list.next;
                io_frag2 = list_entry(pos, wbuf_io_frag_t, hook);
                YASSERT(io_frag == io_frag2);
        }

        io_frag->__enter_count_fill ++;
        YASSERT(io_frag->__enter_count_fill == 1);

        e->__enter_count++;
        YASSERT(e->__enter_count == 1);

        {
                // 临界区
                fifo_fill_ctx_t ctx;
                ctx.lsv_info = io_frag->lsv_info;
                ctx.func = func;
                ctx.io_frag = io_frag;
                ctx.lba = lba;

                __do_fifo_fill(&ctx);

                YASSERT(io_frag->status == WBUF_IO_FRAG_FILL);

                // 第一任务会改变队列头部
                // YASSERT(pos == e->fifo.list.next);
                count_list_del_init(pos, &e->fifo);
        }

        // 在此期间，可能有新的记录加入
        DINFO_NOP("lba %llu io_frag %p left %u\n", (LLU)lba, io_frag, e->fifo.count);

        e2 = hash_table_find(ht, (void *)&lba);
        YASSERT(e2 == e);

        e->__enter_count--;

        // 先移入本地队列，故队列为空，不意味着执行完成
        if (list_empty_careful(&e->fifo.list)) {
                DINFO_NOP("free lba %llu\n", (LLU)e->lba);
                ret = hash_table_remove(e->ht, &e->lba, (void **)&e2);
                YASSERT(ret == 0);
                YASSERT(e2 = e);
                YASSERT(e2->fifo.count == 0);
                yfree((void **)&e2);
        } else {
                // wakeup next, if any
                pos = e->fifo.list.next;
                io_frag2 = list_entry(pos, wbuf_io_frag_t, hook);
                YASSERT(io_frag2->lba == lba);
                DINFO_NOP("wakeup next %llu io_frag %p next %p status %u\n", (LLU)lba, io_frag, io_frag2, io_frag2->status);
                if (io_frag2->status == WBUF_IO_FRAG_WAIT) {
                        co_cond_broadcast(&io_frag2->cond, 0);
                }
        }

        return 0;
}
